package com.dec.kafka.offset.reset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.util.*;

public class ResetMain2 {

    static Properties props = new Properties();

    private static final Logger LOGGER = LoggerFactory.getLogger(ResetMain2.class);

    private static String bootstrap_servers = "kafka001:6667,kafka002:6667,kafka003:6667,kafka004:6667,kafka005:6667";
    private static String offset_reset_config = "earliest";

    private static List<String[]> repairs = new ArrayList<String[]>();

    public static void main(String[] args) throws Exception {

        loadConfig("/home/hdfs/soft/IdeaProjects/dec-kks-etl/dec-kafka-offset-reset/src/main/resources");

        for (int i = 0; i < repairs.size(); i++) {
            String[] str = repairs.get(i);

            String topic = str[0];
            String groupId = str[1];
            int partition = Integer.parseInt(str[2]);
            int offset = Integer.parseInt(str[3]);

            LOGGER.info("修复对象：主题{}", topic);
            LOGGER.info("修复对象：消费組ID{}", groupId);
            LOGGER.info("修复对象：分区ID{}", partition);
            LOGGER.info("修复对象：偏移量{}", offset);
            try {
                LOGGER.info("修复开始：主题{}，消费組ID{}，分区ID{},偏移量{}", topic, groupId, partition, offset);
                KafkaConsumer<String, String> consumer = initConsumer(offset_reset_config, groupId, bootstrap_servers);
                reset(consumer, topic, groupId, partition, offset);
                LOGGER.info("修复完成：主题{}，消费組ID{}，分区ID{},偏移量{}", topic, groupId, partition, offset);
            } catch (Exception e) {
                LOGGER.error("修复失败：主题{}，消费組ID{}，分区ID{},偏移量{},\n异常信息：{}", topic, groupId, partition, offset,e);
            }
        }

    }

    public static void reset(KafkaConsumer<String, String> consumer,
                             String topic,
                             String group_id,
                             int partition,
                             int offset) throws InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition tp = new TopicPartition(topic, partition);
        OffsetAndMetadata om = new OffsetAndMetadata(offset, group_id);
        offsetAndMetadataMap.put(tp, om);
        consumer.commitSync(offsetAndMetadataMap);
    }

    public static KafkaConsumer<String, String> initConsumer(String offset_reset_config,
                                                             String group_id,
                                                             String bootstrap_servers) {
        Properties config = new Properties();
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset_reset_config);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);

        return consumer;
    }

    public static void loadConfig(String confPath) throws Exception {
        try {
            if (!confPath.endsWith(File.separator)) {
                confPath = confPath + File.separator;
            }
            confPath = confPath + "reset.properties";
            props.load(new FileInputStream(confPath));

            bootstrap_servers = props.getProperty("bootstrap_servers");
            offset_reset_config = props.getProperty("offset_reset_config");

            Iterator<Map.Entry<Object, Object>> it = props.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Object, Object> e = it.next();
                String key = e.getKey().toString();
                String value = e.getValue().toString();
                if (key.startsWith("repair")) {
                    repairs.add(value.split(","));
                }
            }

        } catch (Exception e) {
            LOGGER.error("配置文件加载路径不正确!" + confPath, e);
            throw new Exception("配置文件加载路径不正确!" + confPath, e);
        }
    }
}
